package com.Long.dtx.seatademo.bank1.message;

import com.Long.dtx.seatademo.bank1.dao.AccountInfoDao;
import com.Long.dtx.seatademo.bank1.entity.AccountChangeEvent;
import com.Long.dtx.seatademo.bank1.service.AccountInfoService;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.datasource.lookup.JndiDataSourceLookup;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

/**
 * @Title: 监听消息是否被消息
 * @Description:
 * @Author: guowl
 * @version： 1.0
 * @Date:2021/4/21
 * @Copyright: Copyright(c)2021 RedaFlight.com All Rights Reserved
 */
@Component
@RocketMQTransactionListener(txProducerGroup = "producer_group_txms_bank1")
@Slf4j
public class RocketMQTranisactionListener implements RocketMQLocalTransactionListener {

    @Autowired
    AccountInfoService accountInfoService;
    @Autowired
    AccountInfoDao accountInfoDao;

    /*
     * 事务消息发送成功后的回调
     *
     * */
    //当消息发送成功后此消息才会发生回调
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        String str = new String((byte[]) message.getPayload());
        AccountChangeEvent accountChangeEvent = (AccountChangeEvent) JSONObject.parseObject(str).get("accountChange");
        //执行本地事务，扣减金额
        try {
            accountInfoService.doUpdateAccountBalance(accountChangeEvent);
            //RocketMQLocalTransactionState.COMMIT;自动向mq发送消息 mq将消息设为可执行状态
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("error:{}", e);
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    /*
     * 事务状态的回查
     *
     * */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        String str = new String((byte[]) message.getPayload());
        AccountChangeEvent accountChangeEvent = (AccountChangeEvent) JSONObject.parseObject(str).get("accountChange");
        if (accountInfoDao.isExistTx(accountChangeEvent.getTxNo()) > 0) {
            //说明消息已经正确的消息
            return RocketMQLocalTransactionState.COMMIT;
        } else {
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
}
